/**
 * Tests the compatibility of resume tokens across server version upgrade / downgrade.
 * @tags: [uses_change_streams, requires_replication, requires_fcv_80]
 */

import "jstests/multiVersion/libs/multi_cluster.js";
import {
    assertCreateCollection,
    assertDropCollection
} from "jstests/libs/collection_drop_recreate.js";
import {ShardingTest} from "jstests/libs/shardingtest.js";

const st = new ShardingTest({
    shards: 2,
    config: 1,
    other: {
        mongosOptions: {binVersion: "last-lts"},
        configOptions: {
            binVersion: "last-lts",
        },
        rsOptions: {
            binVersion: "last-lts",
        },
        rs: {
            nodes: 2,
        }
    }
});

const collName = "test";
const viewName = "testView";
const timeseriesName = "ts";

let testDB = st.s.getDB(jsTestName());
let testColl = testDB[collName];

// Record high-watermark time marking the start point of the test.
const testStartOperationTime = testDB.hello().$clusterTime.clusterTime;

// An array which will list the expected sequence of change events generated by the test.
const expectedEvents = [];

//
// Below, we generate one of each type of change event so that we can later test resuming from the
// token representing such event.
//
testColl = assertCreateCollection(testDB, testColl.getName());
expectedEvents.push({operationType: "create"});

assert.commandWorked(testColl.createIndexes([{shard: 1}, {shard: 1, _id: 1}, {value: 1}]));
expectedEvents.push({operationType: "createIndexes"},
                    {operationType: "createIndexes"},
                    {operationType: "createIndexes"});

// Shard the test collection and split it into two chunks: one that contains all {shard: 1}
// documents and one that contains all {shard: 2} documents.
st.shardColl(testColl, {shard: 1} /* shard key */, {shard: 2} /* split at */);
expectedEvents.push({operationType: "shardCollection"});

assert.commandWorked(testColl.insertMany([
    {_id: "a", shard: 1, value: ""},
    {_id: "b", shard: 2, value: ""},
    {_id: "c", shard: 2, value: ""}
]));
expectedEvents.push({operationType: "insert", documentKey: {shard: 1, _id: "a"}},
                    {operationType: "insert", documentKey: {shard: 2, _id: "b"}},
                    {operationType: "insert", documentKey: {shard: 2, _id: "c"}});

assert.commandWorked(testColl.update({_id: "a", shard: 1}, {$set: {value: "x"}}));
expectedEvents.push({operationType: "update", documentKey: {_id: "a", shard: 1}});

assert.commandWorked(testColl.update({_id: "b", shard: 2}, {$set: {value: "x"}}));
expectedEvents.push({operationType: "update", documentKey: {_id: "b", shard: 2}});

assert.commandWorked(testColl.replaceOne({_id: "a", shard: 1}, {_id: "a", shard: 1, value: "y"}));
expectedEvents.push({operationType: "replace", documentKey: {_id: "a", shard: 1}});

assert.commandWorked(testColl.replaceOne({_id: "b", shard: 2}, {_id: "b", shard: 2, value: "y"}));
expectedEvents.push({operationType: "replace", documentKey: {_id: "b", shard: 2}});

assert.commandWorked(testColl.remove({_id: "a"}));
expectedEvents.push({operationType: "delete", documentKey: {_id: "a", shard: 1}});

assert.commandWorked(testColl.remove({_id: "b"}));
expectedEvents.push({operationType: "delete", documentKey: {_id: "b", shard: 2}});

assert.commandWorked(
    st.s.adminCommand({refineCollectionShardKey: testColl.getFullName(), key: {shard: 1, _id: 1}}));
expectedEvents.push({operationType: "refineCollectionShardKey"});

assert.commandWorked(st.s.adminCommand(
    {reshardCollection: testColl.getFullName(), key: {_id: 1}, numInitialChunks: 2}));
expectedEvents.push({operationType: "reshardCollection"});

assert.commandWorked(testColl.dropIndex({value: 1}));
expectedEvents.push({operationType: "dropIndexes"}, {operationType: "dropIndexes"});

// Create view on a collection.
assert.commandWorked(
    testDB.runCommand({create: viewName, viewOn: collName, pipeline: [{$match: {foo: "bar"}}]}));
expectedEvents.push({operationType: "create"});

assert.commandWorked(testDB.runCommand({drop: viewName}));
expectedEvents.push({operationType: "drop"});

// Create timeseries collection.
assert.commandWorked(testDB.runCommand({create: timeseriesName, timeseries: {timeField: "t"}}));
expectedEvents.push({operationType: "create"});

assert.commandWorked(testDB.runCommand({drop: timeseriesName}));
expectedEvents.push({operationType: "drop"});

const newTestCollectionName = "test_";
assert.commandWorked(testColl.renameCollection(newTestCollectionName));
expectedEvents.push({operationType: "rename"});

assertDropCollection(testDB, newTestCollectionName);
expectedEvents.push({operationType: "drop"});

assert.commandWorked(testDB.dropDatabase());
// A whole-DB stream will be invalidated by the dropDatabase event. We include a second dropDatabase
// event because one such event is generated on each shard, and will be reported if we resume after
// the invalidate. This second dropDatabase acts as a sentinel here, signifying that we have reached
// the end of the test stream.
expectedEvents.push({operationType: "dropDatabase"},
                    {operationType: "invalidate"},
                    {operationType: "dropDatabase"});

// Leave only one of two "dropIndexes" events when they have identical resume tokens, because the
// second event will be skipped when resuming from the first event's token in such a case.
// TODO SERVER-90023: Remove this workaround when no longer needed.
let resumeTokensLastLTS = [];
{
    const csCursor = testDB.watch([], {
        showExpandedEvents: true,
        startAtOperationTime: testStartOperationTime,
        batchSize: 0,
    });
    const hwmToken = csCursor.getResumeToken();
    assert.eq(decodeResumeToken(hwmToken).tokenType,
              highWaterMarkResumeTokenType,
              "expected a high-watermark token");
    resumeTokensLastLTS.push(hwmToken);

    const dropIndexesEvents = [];
    // Fetch all events and compare to the expected ones. Note that we need to exclude the second
    // dropDatabase sentinel event here.
    for (let i = 0; i < expectedEvents.length - 1; ++i) {
        assert.soon(() => csCursor.hasNext());
        const event = csCursor.next();
        assert.eq(expectedEvents[i].operationType, event.operationType);
        if (event.operationType === "dropIndexes") {
            dropIndexesEvents.push(event);
        }
        resumeTokensLastLTS.push(event._id);
    }
    csCursor.close();
    assert.eq(2, dropIndexesEvents.length, "unexpected number of 'dropIndexes' events");
    if (bsonWoCompare(dropIndexesEvents[0]._id, dropIndexesEvents[1]._id) === 0) {
        const dropIndexPosition =
            expectedEvents.findIndex((event) => (event.operationType === "dropIndexes"));
        expectedEvents.splice(dropIndexPosition, 1);
        resumeTokensLastLTS.splice(dropIndexPosition + 1, 1);
    }
}

// Helper function to assert on the given event fields.
function assertEventMatches(event, expectedEvent, errorMsg) {
    for (const k in expectedEvent) {
        assert.docEq(expectedEvent[k], event[k], errorMsg + `: value mismatch for field '${k}'`);
    }
}

// Asserts that resuming from all collected resumeTokens produces the expected events.
function assertTokenResumability() {
    for (let i = 1; i < expectedEvents.length; ++i) {
        const options = {startAfter: resumeTokensLastLTS[i]};
        const csCursor =
            testDB.watch([], {showExpandedEvents: true, startAfter: resumeTokensLastLTS[i]});
        const errorMsg =
            "could not retrieve the expected event matching " + tojson(expectedEvents[i]);
        assert.soon(() => csCursor.hasNext());
        const event = csCursor.next();
        assertEventMatches(event, expectedEvents[i], errorMsg);
        csCursor.close();
    }
}

// Upgrade the cluster to 'latest' to test resumability.
st.upgradeCluster("latest", {waitUntilStable: true});
assert.commandWorked(st.s.adminCommand({setFeatureCompatibilityVersion: latestFCV, confirm: true}));

testDB = st.s.getDB(jsTestName());

// Verify that we can resume from each of the LTS tokens on the new binary version.
// When resuming from (i-1)-th event's token we expect to get the i-th event. We do not need to test
// the last token, because it is simply a sentinel value that signifies the end of the test.
assertTokenResumability();

// Downgrade back to the original version.
assert.commandWorked(
    st.s.adminCommand({setFeatureCompatibilityVersion: lastLTSFCV, confirm: true}));
st.downgradeCluster("last-lts", {waitUntilStable: true});

testDB = st.s.getDB(jsTestName());

assertTokenResumability();

st.stop();
